#!/usr/bin/perl
use strict;
use MIME::Base64;
use threads;
use Thread::Queue;
use Getopt::Long qw(:config bundling no_ignore_case no_autoabbrev passthrough);
use IO::Handle qw();
use POSIX;

my ($DATABASE_NAME,$BACKUP_DB,$PORT,@TABLE_ARRAY,@EX_TABLE_ARRAY,$TABLE_FILE,$EX_TABLE_FILE,@SCHEMA_ARRAY,@EX_SCHEMA_ARRAY,$BATCH_SIZE,$DIRECTORY,$GLOBAL_CONDITION,$TIME_TO,
    $FORCE_REDO,$DATA_ONLY,$NEED_TRUNCATE,$NO_ERROR_TABLE,$ENCODING,$PARAMETER_FILE,$IS_HELP,$VERSION);
my ($MASTER_BACKUP_PATH,$MASTER_LOG_PATH,$MASTER_LOG_FILE,$MASTER_DONE_FILE,$LOG_FILE_HANDLE);
my ($FULL_FILE_NAME,$LOG_FILE_NAME,$LAST_STAT_FILE_NAME,$DONE_FILE_NAME) = ("backup_full.tag","restore_log.log","last_stat.tag","restore_done.log");
my ($TASK_QUEUE,$MSG_QUEUE);
my ($SQL_SPLIT,$CMD_SPLIT,$SQL_DELIM,$RECORD_SPLIT) = ('chr(1)||chr(2)||chr(7)',chr(1).chr(2).chr(7),chr(3).chr(4).chr(8),chr(5).chr(6).chr(9).chr(10));
my ($FULL_BACKUP_TIME,$LAST_BACKUP_TIME,$INDEX_FILE_TIME);
my (@TASK_THREAD,$MSG_THREAD,$LOCK_THREAD);
my (%ALL_INDEX_MAP,@INDEX_CREATE_LIST,@INDEX_DROP_LIST);
my (@MESSAGE_CACHE);
my ($WRITE_TO_FILE) = (0);
(my $CMD_NAME = $0) =~ s!.*/(.*)!$1!;
my $MAIN_PID = substr("000000".$$,-6);

my ($BATCH_MAX,$BATCH_DEFAULT,$BATCH_MIN) = (32,5,1);

my $GET_PARENT_CHILD_LIST_SQL = q{with all_parent as (
    select c.oid,nspname,relname,relpages
    from pg_class c, pg_namespace n
        where c.relnamespace = n.oid
        and c.relkind = 'r' and c.relstorage <> 'x'
        and (n.oid > 16384 or n.nspname = 'public')
        and n.nspname not like E'pg\_temp\_%' and n.nspname not like E'pg\_toast\_temp\_%'
        and c.oid not in(select parchildrelid from pg_partition_rule)
), all_child as (
    select parrelid,parchildrelid,nspname,relname,relpages from(
        select p.parrelid,pr.parchildrelid,p.parlevel,n.nspname,c.relname,
            rank() over(partition by parrelid order by parlevel desc) rank,relpages
        from pg_partition p, pg_partition_rule pr, pg_class c, pg_namespace n
        where pr.paroid = p.oid and pr.parchildrelid = c.oid and c.relnamespace = n.oid
        and c.relkind = 'r' and c.relstorage <> 'x'
        and (n.oid > 16384 or n.nspname = 'public')
        and n.nspname not like E'pg\_temp\_%' and n.nspname not like E'pg\_toast\_temp\_%'
    ) t where rank = 1
)
select replace(encode(textsend(x.nspname),'base64'),chr(10),''),
    replace(encode(textsend(x.relname),'base64'),chr(10),''),
    replace(encode(textsend(y.nspname),'base64'),chr(10),''),
    replace(encode(textsend(y.relname),'base64'),chr(10),'')
from all_parent x
left join all_child y on x.oid = y.parrelid
order by decode(y.relpages,y.relpages,x.relpages) desc;};

my $GET_PARENT_CHILD_LIST_NO_ERROR_SQL = q{with all_parent as (
    select c.oid,nspname,relname,relpages
    from pg_class c, pg_namespace n
        where c.relnamespace = n.oid
        and c.relkind = 'r' and c.relstorage <> 'x'
        and (n.oid > 16384 or n.nspname = 'public')
        and n.nspname not like E'pg\_temp\_%' and n.nspname not like E'pg\_toast\_temp\_%'
        and c.oid not in(select parchildrelid from pg_partition_rule)
        and c.oid not in(select attrelid from pg_attribute
            where attnum>0 group by 1 having string_agg(attname,',' order by attnum) = 'cmdtime,relname,filename,linenum,bytenum,errmsg,rawdata,rawbytes'
        )
), all_child as (
    select parrelid,parchildrelid,nspname,relname,relpages from(
        select p.parrelid,pr.parchildrelid,p.parlevel,n.nspname,c.relname,
            rank() over(partition by parrelid order by parlevel desc) rank,relpages
        from pg_partition p, pg_partition_rule pr, pg_class c, pg_namespace n
        where pr.paroid = p.oid and pr.parchildrelid = c.oid and c.relnamespace = n.oid
        and c.relkind = 'r' and c.relstorage <> 'x'
        and (n.oid > 16384 or n.nspname = 'public')
        and n.nspname not like E'pg\_temp\_%' and n.nspname not like E'pg\_toast\_temp\_%'
    ) t where rank = 1
)
select replace(encode(textsend(x.nspname),'base64'),chr(10),''),
    replace(encode(textsend(x.relname),'base64'),chr(10),''),
    replace(encode(textsend(y.nspname),'base64'),chr(10),''),
    replace(encode(textsend(y.relname),'base64'),chr(10),'')
from all_parent x
left join all_child y on x.oid = y.parrelid
order by decode(y.relpages,y.relpages,x.relpages) desc;};
my $GET_INDEX_INFO_LIST_SQL = q{select
    replace(encode(textsend(schemaname),'base64'),chr(10),''),
    replace(encode(textsend(tablename),'base64'),chr(10),''),
    decode(conname, null, 'i' , 'p'),
    replace(encode(textsend(coalesce(conname,indexname)),'base64'),chr(10),''),
    replace(encode(textsend(indexdef),'base64'),chr(10),'')
from (
    select schemaname,tablename,indexname,indexdef
    from pg_indexes where schemaname in(select nspname from pg_namespace where oid > 16384 or oid=2200)
) x left join (
    select n.nspname, c.relname, con.conname
    from pg_constraint con, pg_class c, pg_namespace n
    where con.conrelid = c.oid and con.connamespace = n.oid and contype = 'p'
) y on (x.schemaname, x.tablename) = (y.nspname, y.relname);};

my $STR_CLEVER_CMD = q[#!/bin/bash
cd $(cd "$(dirname "$0")"; pwd)
DIRC=$1
TIME=$2
PORT=$3
FLNM=$4
SELF=$5
DBNM=${FLNM%%.*}
FLNM=${FLNM#*.}
SCMA=${FLNM%%.*}
TBNM=${FLNM#*.}
CNTT=`PGOPTIONS='-c gp_session_role=utility' psql -qtAX -d postgres -p $PORT -c "show gp_contentid"`
FILE=$DIRC/db_dumps/$TIME/$DBNM^$CNTT^$SCMA.$TBNM
if [ "$SELF" == "1" ];then
FILE=$DIRC/gpseg$CNTT/db_dumps/$TIME/$DBNM^$CNTT^$SCMA.$TBNM
fi
set -o pipefail
if [ -e $FILE ];then
    cat $FILE
elif [ -e $FILE.gz ];then
    cat $FILE.gz|gunzip
elif [ -e $FILE.zst ];then
    cat $FILE.zst|unzstd
else
    echo "File $FILE not exists" >&2
    exit 1
fi
ERR=$?
if [ $ERR -ne 0 ];then
exit $ERR
fi
];

my $EXECUTE_FUNCTION_ARG_CHECK_SQL = q{select string_agg(t.typname,',' order by a.idx) from(
    select proargtypes typs,generate_series(0,array_upper(proargtypes,1)) idx from pg_proc
        where proname='gp_restore_execute' and pronamespace=(select oid from pg_namespace where nspname='gp_toolkit')
    )a,pg_type t where t.oid=a.typs[a.idx];};
my $EXECUTE_FUNCTION_ARG_CHECK_VALUE = "varchar";
my $EXECUTE_FUNCTION_SRC_CHECK_SQL = q{select md5(prosrc) from pg_proc
    where proname='gp_restore_execute' and pronamespace=(select oid from pg_namespace where nspname='gp_toolkit');};
my $EXECUTE_FUNCTION_SRC_CHECK_MD5 = "5c9dd8af64ef05f79f06df02e39f3cf2";
my $EXECUTE_FUNCTION_DDL = q#create or replace function gp_toolkit.gp_restore_execute(command varchar) returns text as $$
import commands
cmd_str=command
try:
    (cod,val)=commands.getstatusoutput(cmd_str)
    if cod!=0:
        plpy.error("%s:%s" % (cod,val))
    else:
        return val
except Exception, e:
    plpy.error(str(e))
$$ language plpythonu;
#;

my $HELP_MESSAGE = qq#COMMAND NAME: $CMD_NAME
************************************************************************************************
Restore a Greenplum database backup data to database by external table.

Developed by Miao Chen

Work Email:
michen\@pivotal.io
Private Email:
miaochen\@mail.ustc.edu.cn

************************************************************************************************
SYNOPSIS
************************************************************************************************
$CMD_NAME --database database name
    [--backup-database database name]
    [--port database port]
    [-t <schema.relation> [-t <schema.relation>] ...]
    [-T <schema.relation> [-T <schema.relation>] ...]
    [-f file include table name]
    [-F file exclude table name]
    [-s <schema> [-s <schema>] ...]
    [-S <schema> [-S <schema>] ...]
    [-B batch_size]
    [--directory directory]
    [--where condition]
    --time-to time 
    [--force-redo]
    [-a]
    [--truncate]
    [--no-error]
    [--encoding encoding]
    [--parameter-file filename]
    [-h|--help]
    [--version]
*****************************************************
DESCRIPTION
*****************************************************
The $CMD_NAME utility is used to restore the Greenplum Database backup data.
When you start $CMD_NAME,the utility will restore every table from local disk.

Restore is running in parallel.
One table's restore is running at once on all Segment Instance.
At any time,there are several tables are restore.
*****************************************************
OPTIONS
*****************************************************

--database <database>

  Restore table in this database.
  This option is required.
  If you not specify -t and -f options, all user tables in this database will be restore.
  eg.:
  --database postgres

--backup-database <database name>

  Restore backup data from a different database with this name.

--port <master port>

  Database port number, If not specified, the default is 5432.
  eg.
  --port 5433

-t <schema.tablename>

  A table in database to be restore.
  The fully qualified table name must be specified.
  This option can be specified multiple times to include multiple tables.
  If the table does not exist, $CMD_NAME ignore it and log a notice message.

  eg.
  -t schema.relation

-T <schema.relation>

  Do not restore this table.
  This option can be specified multiple times to exclude multiple tables.
  eg.:
  -S public.ex_a -S public.ex_b

-f <table-file>

  The location and name of file containing list of fully qualified table names to be restore.
  In the text file, you should specify a single fully qualified table per line.
  If the table does not exist, $CMD_NAME ignore it and log a notice message.
  You should specify the qualified table name as format:
  schema.relation[;condition];
  This example lists 4 tables should be restore.
  public.customer;time>='2016-01-01'
  public.customer_v;district_code='021'
  myschema.orders
  his.orders_his

-F <table-file>

  The location and name of file containing list of fully qualified table names to be exclude.
  In the text file, you should specify a single fully qualified table per line.
  If the table does not exist, $CMD_NAME ignore it.
  You should specify the qualified table name as format:
  schema.relation

-s <schema name>

  Restore tables in this schema.
  This option can be specified multiple times to include multiple schemas.
  eg.:
  -s public -s myschema

-S <schema name>

  Do not restore tables in this schema.
  This option can be specified multiple times to include multiple schemas.
  eg.:
  -S public -S myschema

-B <batch_size>

  Sets the maximum number of tables that $CMD_NAME concurrently restore database.
  If not specified, the default is $BATCH_DEFAULT.
  The max is $BATCH_MAX.
  The min is $BATCH_MIN.

--directory <directory>

  Restore tables data from this directory.
  All segments(include Master) must have this directory, and gpadmin user have authority.
  If not specified, $CMD_NAME will restore from instance's data directory / db_dumps /
  eg.:
  --directory /data

--where

  With condition when restore a table.
  This condition is gloabal condition, condition in -f file will over write this global condition.
  eg.:
  --where "data_date='2016-01-01'"

--time-to <time>

  Restore find backup data file until this time.

-a

  Restore only data, no ddls will be restore.

--force-redo

   Restore will check the success log file in the current restore directory.
   Default, restore will ignore the table which found in the success log file.
   With this parameter, restore will ignore the success log file.

--truncate

  Truncate table data before start restore table data.

--no-error

  Restore table range not include external table's error table.

--encoding <encoding>

  Restore backup data with this encoding, if the backup data have a diffrent encoding from UTF8.

--parameter-file <parameter_file>

  Specify the parameter file.
  Format:
  name=value
  Like:
  f=restore_table_list
  database=postgres
  ...

-h|--help

  Displays the online help.

--version

  Displays the command version.

Examples:
$CMD_NAME --database postgres --time-to 20140825082500 --port 4371 --truncate -a
$CMD_NAME --database postgres --time-to 20140825082500 --port 4371 --truncate --force-redo -a
$CMD_NAME -h
#;

sub printMessage{
    my ($flag,$message) = @_;
    if("RAW" ne $flag){
        my $time_flag = strftime("%Y%m%d:%H:%M:%S.",localtime).$MAIN_PID;
        $message = "$time_flag-[$flag]-:$message\n";
    }
    if("ERROR" eq $flag){
        print STDERR $message;
    }else{
        print STDOUT $message;
    }
    return $message;
}
sub logMessage{
    my ($flag,$message) = @_;
    my $log_message = printMessage($flag,$message);
    if($WRITE_TO_FILE == 1){
        for my $msg(@MESSAGE_CACHE){
            print $LOG_FILE_HANDLE $msg;
        }
        @MESSAGE_CACHE = ();
        $WRITE_TO_FILE = 2;
        print $LOG_FILE_HANDLE $log_message;
    }elsif($WRITE_TO_FILE == 2){
        print $LOG_FILE_HANDLE $log_message;
    }else{
        push @MESSAGE_CACHE,$log_message;
    }
}
sub exitMain{
    my ($code) = @_;
    if("" ne $MSG_THREAD){
        $MSG_THREAD->kill('KILL')->detach();
    }
    for my $thread(@TASK_THREAD){
        $thread->kill('KILL')->detach();
    }
    if("" ne $LOCK_THREAD){
        $LOCK_THREAD->kill('KILL')->detach();
    }
    if(defined $LOG_FILE_HANDLE){
        close($LOG_FILE_HANDLE);
    }
    exit $code;
}
sub errorMessage{
    my ($message) = @_;
    logMessage("ERROR",$message);
    print "Usage: $CMD_NAME [-h|--help] [options]\n";
    exitMain(1);
}
sub trim{
    my ($string) = @_;
    $string =~ s/(^\s+|\s+$)//g;
    return $string;
}
sub encode{
    my ($string) = @_;
    my $encode = encode_base64($string);
    $encode =~ s/\n//g;
    return $encode;
}
sub decode{
    return decode_base64($_[0]);
}
sub illegal{
    my @list = @_;
    for my $str(@list){
        if ($str =~ /\W/){
            return 1;
        }
    }
    return 0;
}
sub escape{
    my ($str) = @_;
    $str =~ s/\r/\\r/g;
    $str =~ s/\n/\\n/g;
    $str =~ s/\t/\\t/g;
    return $str;
}
sub queryResult{
    my ($query_sql,$return_flag) = @_;
    my $CMDS = "PGDATABASE=$DATABASE_NAME PGPORT=$PORT PGOPTIONS='-c optimizer=off -c client_encoding=UTF8' ";
    local $/ = $RECORD_SPLIT;
    $CMDS = $CMDS."psql -R '$/' -tAXF '$SQL_DELIM' -v ON_ERROR_STOP=1 2>&1 <<'END_OF_SQL'\n";
    $CMDS = $CMDS.$query_sql."\n";
    $CMDS = $CMDS."END_OF_SQL\n";
    my @result = readpipe($CMDS);
    my $return_code = $? >> 8;
    chomp(@result);
    local $/ = chr(10);
    chomp($result[-1]) if (@result > 0);
    return ($return_code,join("\n",@result)) if ("CV" eq $return_flag);
    errorMessage(join("\n",@result)) if ($return_code);
    return join("\n",@result) if ("Scalar" eq $return_flag);
    my @return_list = ();
    for my $row(@result){
        push @return_list,[split(/$SQL_DELIM/,$row)];
    }
    return @return_list;
}
sub getOption{
    GetOptions(
        'database:s'       => \$DATABASE_NAME,   'backup-database:s' => \$BACKUP_DB,
        'port:i'           => \$PORT,            't:s'               => \@TABLE_ARRAY,
        'T:s'              => \@EX_TABLE_ARRAY,  'f:s'               => \$TABLE_FILE,
        'F:s'              => \$EX_TABLE_FILE,   's:s'               => \@SCHEMA_ARRAY,
        'S:s'              => \@EX_SCHEMA_ARRAY, 'B:i'               => \$BATCH_SIZE,
        'directory:s'      => \$DIRECTORY,       'where:s'           => \$GLOBAL_CONDITION,
        'time-to:i'        => \$TIME_TO,         'force-redo!'       => \$FORCE_REDO,
        'a!'               => \$DATA_ONLY,       'truncate!'         => \$NEED_TRUNCATE,
        'no-error!'        => \$NO_ERROR_TABLE,  'encoding:s'        => \$ENCODING,
        'parameter-file:s' => \$PARAMETER_FILE,  'h|help!'           => \$IS_HELP,
        'version!'         => \$VERSION,
    );
    if(@ARGV != 0){
        errorMessage("Some parameters unknown: [@ARGV]\nPlease refer to $CMD_NAME --help");
    }
    if($IS_HELP){
        print $HELP_MESSAGE;
        exitMain(0);
    }
    if($VERSION){
        print "$CMD_NAME 1.5\n";
        exitMain(0);
    }
    if("" ne $PARAMETER_FILE){
        my @parameter_list = readLineFromFile($PARAMETER_FILE);
        for my $line(@parameter_list){
            my ($para,$val) = split(/=/,$line,2);
            ($para,$val) = (trim($para),trim($val));
            if("database"         eq $para && "" eq $DATABASE_NAME     ){$DATABASE_NAME        = $val;}
            if("backup-database"  eq $para && "" eq $BACKUP_DB         ){$BACKUP_DB            = $val;}
            if("port"             eq $para && "" eq $PORT              ){$PORT                 = $val;}
            if("t"                eq $para                             ){push @TABLE_ARRAY     , $val;}
            if("T"                eq $para                             ){push @EX_TABLE_ARRAY  , $val;}
            if("f"                eq $para && "" eq $TABLE_FILE        ){$TABLE_FILE           = $val;}
            if("F"                eq $para && "" eq $EX_TABLE_FILE     ){$EX_TABLE_FILE        = $val;}
            if("s"                eq $para                             ){push @SCHEMA_ARRAY    , $val;}
            if("S"                eq $para                             ){push @EX_SCHEMA_ARRAY , $val;}
            if("B"                eq $para && "" eq $BATCH_SIZE        ){$BATCH_SIZE           = $val;}
            if("directory"        eq $para && "" eq $DIRECTORY         ){$DIRECTORY            = $val;}
            if("where"            eq $para && "" eq $GLOBAL_CONDITION  ){$GLOBAL_CONDITION     = $val;}
            if("time-to"          eq $para && "" eq $TIME_TO           ){$TIME_TO              = $val;}
            if("force-redo"       eq $para && "" eq $FORCE_REDO        ){$FORCE_REDO           = 1   ;}
            if("a"                eq $para && "" eq $DATA_ONLY         ){$DATA_ONLY            = 1   ;}
            if("truncate"         eq $para && "" eq $NEED_TRUNCATE     ){$NEED_TRUNCATE        = 1   ;}
            if("no-error"         eq $para && "" eq $NO_ERROR_TABLE    ){$NO_ERROR_TABLE       = 1   ;}
            if("encoding"         eq $para && "" eq $ENCODING          ){$ENCODING             = $val;}
        }
    }
    $DIRECTORY =~ s/\/*\s*$//;
}
sub checkOption{
    if("" eq $DATABASE_NAME){
        errorMessage("Please specify parameter: --database");
    }elsif(illegal($DATABASE_NAME)){
        errorMessage("Database name is not legal:[".escape($DATABASE_NAME)."]");
    }
    if("" eq $PORT){
        $PORT = '5432';
    }
    if("" eq $TIME_TO){
        errorMessage("Please specify parameter: --time-to");
    }
    if("" eq $BATCH_SIZE || $BATCH_SIZE > $BATCH_MAX || $BATCH_SIZE < $BATCH_MIN){
        logMessage("NOTICE","Not specify or out of limit, use default($BATCH_DEFAULT): -B");
        $BATCH_SIZE = $BATCH_DEFAULT;
    }
    $GLOBAL_CONDITION = "" ne $GLOBAL_CONDITION ? " where ".$GLOBAL_CONDITION : "";
    $BACKUP_DB = "" ne $BACKUP_DB ? $BACKUP_DB : $DATABASE_NAME;
    if(illegal($BACKUP_DB)){
        errorMessage("Backup database name is not legal:[".escape($BACKUP_DB)."]");
    }
    if("" eq $ENCODING){
        $ENCODING = "UTF8";
    }
    logMessage("INFO","Option values: --database $DATABASE_NAME --backup-database $BACKUP_DB --port $PORT");
    logMessage("INFO","Option values: -t ".join(' -t ',@TABLE_ARRAY)." -T ".join(' -T ',@EX_TABLE_ARRAY));
    logMessage("INFO","Option values: -f $TABLE_FILE -F $EX_TABLE_FILE -s ".join(' -s ',@SCHEMA_ARRAY)." -S ".join(' -S ',@EX_SCHEMA_ARRAY));
    logMessage("INFO","Option values: -B $BATCH_SIZE --directory $DIRECTORY --where $GLOBAL_CONDITION");
    logMessage("INFO","Option values: --time-to $TIME_TO --force-redo $FORCE_REDO -a $DATA_ONLY --truncate $NEED_TRUNCATE");
    logMessage("INFO","Option values: --no-error $NO_ERROR_TABLE --encoding $ENCODING --parameter-file $PARAMETER_FILE");
}
sub processBackupTimeAndDataDirectory{
    $MASTER_BACKUP_PATH = $DIRECTORY;
    if("" eq $MASTER_BACKUP_PATH){
        $MASTER_BACKUP_PATH = queryResult("show data_directory;","Scalar")."/db_dumps/";
    }else{
        $MASTER_BACKUP_PATH = $MASTER_BACKUP_PATH."/gpseg-1/db_dumps/";
    }
    my $time_string = readpipe("ls -1L $MASTER_BACKUP_PATH|sort -rn");
    for my $time(split(/\n/,$time_string)){
        my $stat_file = $MASTER_BACKUP_PATH.$time."/".$LAST_STAT_FILE_NAME;
        if(!-f $stat_file || $time > $TIME_TO){
            next;
        }
        if($LAST_BACKUP_TIME eq ""){
            $LAST_BACKUP_TIME = $time;
            logMessage("INFO","Find last backup time flag: $time");
        }
        my $full_file = $MASTER_BACKUP_PATH.$time."/".$FULL_FILE_NAME;
        if(-f $full_file){
            $FULL_BACKUP_TIME = $time;
            logMessage("INFO","Find last full backup time flag: $time");
            last;
        }
    }
    if("" eq $FULL_BACKUP_TIME){
        errorMessage("Can't find full backup time flag");
    }
    $MASTER_LOG_PATH = $MASTER_BACKUP_PATH.$LAST_BACKUP_TIME;
    $MASTER_LOG_FILE = $MASTER_LOG_PATH."/".$LOG_FILE_NAME;
    $MASTER_DONE_FILE = $MASTER_LOG_PATH."/".$DONE_FILE_NAME;
    if(-e $MASTER_DONE_FILE){
        printMessage("NOTICE","Restore success log file: $MASTER_DONE_FILE");
    }else{
        printMessage("NOTICE","Try to create restore success log file: $MASTER_DONE_FILE");
    }
    if(!open($LOG_FILE_HANDLE,">>",$MASTER_LOG_FILE)){
        errorMessage("Can't open file: $MASTER_LOG_FILE");
    }else{
        $LOG_FILE_HANDLE->autoflush(1);
    }
    system("touch $MASTER_DONE_FILE");
    $WRITE_TO_FILE = 1;
}
sub checkLanguage{
    my $result = queryResult("select 1 from pg_language where lanname='plpythonu';","Scalar");
    if("" eq $result){
        logMessage("NOTICE","No language plpythonu exists in database, create it");
        queryResult("create language plpythonu;");
    }else{
        logMessage("INFO","Language plpythonu is OK");
    }
}
sub checkExecuteFunction{
    my $src_md5 = queryResult($EXECUTE_FUNCTION_SRC_CHECK_SQL,"Scalar");
    my $arg_val = queryResult($EXECUTE_FUNCTION_ARG_CHECK_SQL,"Scalar");
    if($EXECUTE_FUNCTION_ARG_CHECK_VALUE ne $arg_val || $EXECUTE_FUNCTION_SRC_CHECK_MD5 ne $src_md5){
        logMessage("NOTICE","No restore function or need replace, create or replace it");
        queryResult("drop function if exists gp_toolkit.gp_restore_execute($arg_val);");
        queryResult($EXECUTE_FUNCTION_DDL);
    }else{
        logMessage("INFO","Execute function gp_toolkit.gp_restore_execute is OK");
    }
}
sub createCleverCommand{
    my $host_id = queryResult(qq{select string_agg(id::text,',') from (select min(content) id from gp_segment_configuration where status='u' and role='p' and content>-1 group by hostname)x;},"Scalar");
    my $command = $STR_CLEVER_CMD;
    $command = 'mkdir -p /tmp/fifo;echo "'.encode_base64($command).'"|base64 -d > /tmp/fifo/gpmcrestorecat.sh';
    my $query = qq{select gp_toolkit.gp_restore_execute('$command') from gp_dist_random('gp_id') where gp_segment_id in($host_id);};
    queryResult($query,"Scalar");
}
sub readLineFromFile{
    my ($file_path) = @_;
    if(!-e $file_path){
        errorMessage("No file exists named: $file_path");
    }
    if(!open(FILE,"<",$file_path)){
        errorMessage("Can't open file: $file_path");
    }
    my @line_list = ();
    while(my $line = <FILE>){
        $line = trim($line);
        if(!($line =~ /^#/) && "" ne $line){
            push @line_list,$line;
        }
    }
    close FILE;
    return @line_list;
}
sub getTableFromFile{
    my ($file_name) = @_;
    my @table_list;
    if("" eq $file_name){
        return @table_list;
    }elsif(!-e $file_name){
        errorMessage("File not exists named:$file_name");
    }
    for my $line(readLineFromFile($file_name)){
        $line = trim($line);
        if(!($line =~ /^#/) && "" ne $line){
            my ($table,$where) = split(/;/,$line,2);
            push @table_list,[(trim($table),trim($where))];
        }
    }
    return @table_list;
}
sub restoreDdl(){
    if(!$DATA_ONLY){
        logMessage("INFO","Restore DDLs and OBJECTs and AUTHORIZATIONs");
        my $output = readpipe("gpddlrestore --database $DATABASE_NAME --port $PORT --object-bit FFFF -f $MASTER_BACKUP_PATH"."$LAST_BACKUP_TIME/$BACKUP_DB.ddl.txt >/dev/null 2>&1");
        my $return_code = $? >> 8;
        if($return_code){
            errorMessage("Restore DDLs occur error:\n".$output);
        }
    }
}
sub processTableArray{
    my @all_parent_child_list = queryResult($GET_PARENT_CHILD_LIST_SQL);
    my $full_size = @all_parent_child_list;
    if($NO_ERROR_TABLE){
        @all_parent_child_list = queryResult($GET_PARENT_CHILD_LIST_NO_ERROR_SQL);
        my $ignore_size = $full_size - @all_parent_child_list;
        logMessage("INFO","Ignore error table number is: [$ignore_size]");
    }
    my @all_relation_temp_list = ();
    for my $row(@all_parent_child_list){
        my ($p_scma,$p_rel,$c_scma,$c_rel) = @$row;
        ($p_scma,$p_rel,$c_scma,$c_rel) = (decode($p_scma),decode($p_rel),decode($c_scma),decode($c_rel));
        push @all_relation_temp_list,[($p_scma,$p_rel,$c_scma,$c_rel)];
    }
    @all_parent_child_list = @all_relation_temp_list;
    #####################################################################################################
    my %specify_table_hash = ();
    for my $table(@TABLE_ARRAY){
        my ($scma,$rel) = split(/\./,$table,2);
        if(illegal($scma,$rel)){
            logMessage("WARN","Table name specify is not legal:[$table]");
            next;
        }
        $specify_table_hash{$table} = "";
    }
    for my $row(getTableFromFile($TABLE_FILE)){
        my ($table,$where) = @$row;
        my ($scma,$rel) = split(/\./,$table,2);
        if(illegal($scma,$rel)){
            logMessage("WARN","Table name in file is not legal:[$table]");
            next;
        }
        $specify_table_hash{$table} = $where;
    }
    #####################################################################################################
    my %ex_table_hash = ();
    for my $table(@EX_TABLE_ARRAY){
        my ($scma,$rel) = split(/\./,$table,2);
        if(illegal($scma,$rel)){
            logMessage("WARN","Ex table name specify is not legal:[$table]");
            next;
        }
        $ex_table_hash{$table} = "";
    }
    for my $row(getTableFromFile($EX_TABLE_FILE)){
        my ($table) = @$row;
        my ($scma,$rel) = split(/\./,$table,2);
        if(illegal($scma,$rel)){
            logMessage("WARN","Table name in file is not legal:[$table]");
            next;
        }
        $ex_table_hash{$table} = ""
    }
    if(!$FORCE_REDO){
        logMessage("INFO","Success log file: $MASTER_DONE_FILE");
        for my $row(readLineFromFile($MASTER_DONE_FILE)){
            my ($base64_name,$full_name) = split(/;/,$row,2);
            my ($dbname,$table) = split(/\./,$full_name,2);
            if($dbname ne $DATABASE_NAME){
                next;
            }
            $ex_table_hash{$table} = ""
        }
    }
    #####################################################################################################
    my %specify_schema_hash = ();
    for my $schema(@SCHEMA_ARRAY){
        $specify_schema_hash{$schema} = "";
    }
    my %ex_schema_hash = ();
    for my $schema(@EX_SCHEMA_ARRAY){
        $ex_schema_hash{$schema} = "";
    }
    #####################################################################################################
    my $check_specify = 0;
    if(keys %specify_table_hash > 0 || keys %specify_schema_hash > 0 || "" ne $TABLE_FILE){
        $check_specify = 1;
        logMessage("INFO","Restore table from parameter or file in database: $DATABASE_NAME");
    }else{
        logMessage("INFO","Restore all user table in database: $DATABASE_NAME");
    }
    @TABLE_ARRAY = ();
    for my $row(@all_parent_child_list){
        my ($p_scma,$p_rel,$c_scma,$c_rel) = @$row;
        my $p_table = $p_scma.".".$p_rel;
        my $c_table = $c_scma.".".$c_rel;
        if(illegal($p_scma,$p_rel,$c_scma,$c_rel)){
            logMessage("FAILED","Table name is not legal:[".escape($c_table eq "." ? $p_table : $c_table)."]");
            next;
        }
        if(exists $ex_table_hash{$p_table} || exists $ex_table_hash{$c_table} || exists $ex_schema_hash{$p_scma}){
            next;
        }
        if($check_specify && (not exists $specify_table_hash{$p_table}) && (not exists $specify_table_hash{$c_table}) && (not exists $specify_schema_hash{$p_scma})){
            next;
        }
        my $where = $specify_table_hash{$p_table};
        if(exists $specify_table_hash{$c_table} && "" ne $specify_table_hash{$c_table}){
            $where = $specify_table_hash{$c_table};
        }
        if("" ne $c_scma){
            push @TABLE_ARRAY,[($c_scma,$c_rel,$where)];
        }else{
            push @TABLE_ARRAY,[($p_scma,$p_rel,$where)];
        }
    }
    #####################################################################################################
}
sub getLastBackupStatFromFile{
    my %last_stat_map = ();
    my $time_string = readpipe("ls -1L $MASTER_BACKUP_PATH|sort -rn");
    for my $time(split(/\n/,$time_string)){
        my $stat_file = $MASTER_BACKUP_PATH.$time."/".$LAST_STAT_FILE_NAME;
        if(!-f $stat_file || $time > $LAST_BACKUP_TIME || $time < $FULL_BACKUP_TIME){
            next;
        }
        for my $row(readLineFromFile($stat_file)){
            my @array = split(/;/,$row);
            my ($full_name) = ($array[1]);
            my ($dbname,$table) = split(/\./,$full_name,2);
            if($dbname ne $BACKUP_DB){
                next;
            }
            if(!exists $last_stat_map{$table}){
                $last_stat_map{$table} = $time;
            }
        }
    }
    return %last_stat_map;
}
sub getIndexInfo{
    my @all_index_info_list = queryResult($GET_INDEX_INFO_LIST_SQL);
    for my $row(@all_index_info_list){
        my ($scma,$rel,$type,$index,$def) = @$row;
        ($scma,$rel,$index,$def) = (decode($scma),decode($rel),decode($index),decode($def));
        my $table = $scma.".".$rel;
        if(!exists $ALL_INDEX_MAP{$table}){
            $ALL_INDEX_MAP{$table} = [()];
        }
        my @info_array = @{$ALL_INDEX_MAP{$table}};
        push @info_array,[($type,$index,$def)];
        $ALL_INDEX_MAP{$table} = [@info_array];
    }
}
sub checkTableIndex{
    my ($scma,$rel) = @_;
    my $table = $scma.".".$rel;
    if(exists $ALL_INDEX_MAP{$table}){
        for my $record(@{$ALL_INDEX_MAP{$table}}){
            my ($type,$index,$def) = @$record;
            if($type eq "p" && $index =~ /_1_prt_/){
                logMessage("WARN","Table $table PKEY $index can not drop for partition table");
                next;
            }
            push @INDEX_CREATE_LIST,$def;
            if($type eq "p"){
                push @INDEX_DROP_LIST,qq{ALTER TABLE "$scma"."$rel" DROP CONSTRAINT "$index"};
            }else{
                push @INDEX_DROP_LIST,qq{DROP INDEX "$index"};
            }
        }
    }
}
sub processLastBackupStat{
    my %last_backup_stat_from_file = getLastBackupStatFromFile();
    getIndexInfo();
    my @temp_table_array = ();
    for my $row(@TABLE_ARRAY){
        my ($scma,$rel,$where) = @$row;
        my $table = $scma.".".$rel;
        if(exists $last_backup_stat_from_file{$table}){
            my $path_time = $last_backup_stat_from_file{$table};
            push @temp_table_array,[$scma,$rel,$where,$path_time];
            checkTableIndex($scma,$rel);
        }else{
            logMessage("WARN","No backup record found with table name: $table");
        }
    }
    @TABLE_ARRAY = @temp_table_array;
}
sub processIndexDropAndLog{
    if(@INDEX_DROP_LIST < 1){
        return;
    }
    my $time_flag = strftime("%Y%m%d%H%M%S",localtime);
    my $index_create_file = "$MASTER_LOG_PATH/$DATABASE_NAME.index.create.$time_flag.txt";
    my $index_drop_file = "$MASTER_LOG_PATH/$DATABASE_NAME.index.drop.$time_flag.txt";
    my $index_drop_log = "$MASTER_LOG_PATH/$DATABASE_NAME.index.drop.$time_flag.log";
    logMessage("WARN","Some index should be drop before restore data from file");
    logMessage("WARN","You can recreate it use index definition file:\n  ==>>  ".$index_create_file);
    logMessage("INFO","Reference the log file to get the index delete information:\n  ==>>  $index_drop_log");
    my $FILE_HANDLE;
    if(!open($FILE_HANDLE,">>",$index_create_file)){
        errorMessage("Can't open file: $index_create_file");
    }
    $FILE_HANDLE->autoflush(1);
    for my $row(@INDEX_CREATE_LIST){
        print $FILE_HANDLE $row.";\n";
    }
    close $FILE_HANDLE;
    if(!open($FILE_HANDLE,">>",$index_drop_file)){
        errorMessage("Can't open file: $index_drop_file");
    }
    $FILE_HANDLE->autoflush(1);
    for my $row(@INDEX_DROP_LIST){
        print $FILE_HANDLE $row.";\n";
    }
    close $FILE_HANDLE;
    my $command = "PGDATABASE=$DATABASE_NAME PGPORT=$PORT psql -qtAX -v ON_ERROR_STOP=1 -f $index_drop_file > $index_drop_log 2>&1";
    system($command);
    my $return_code = $? >> 8;
    if($return_code == 0){
        $INDEX_FILE_TIME = $time_flag;
    }else{
        logMessage("WARN","Drop index occur error: $index_drop_file");
    }
}
sub checkRestoreSchema{
    my $schema_exists = queryResult("SELECT count(*) from pg_namespace where nspname='gpmcrestore';","Scalar");
    if(0 == $schema_exists){
        logMessage("NOTICE","No schema named gpmcrestore in database [$DATABASE_NAME], create it.");
        queryResult("create schema gpmcrestore;");
    }
}
sub executeRestore{
    $SIG{'KILL'} = sub{threads->exit;};
    my ($thread_index) = @_;
    my $ext_table = 'gpmcrestore.restore_'.$thread_index;
    my $backup_path = "" eq $DIRECTORY ? '$GP_SEG_DATADIR' : $DIRECTORY;
    my $self_path = "" eq $DIRECTORY ? 0 : 1;
    my $task = $TASK_QUEUE->dequeue();
    my $truncate_info = $NEED_TRUNCATE ? " with truncate" : "";
    while(defined $task){
        my ($scma,$rel,$where,$time) = @$task;
        my $table = $scma.".".$rel;
        my $full_name = $DATABASE_NAME.".".$table;
        my $back_tbnm = $BACKUP_DB.".".$table;
        $rel =~ s/\$/\\\\\$/g;
        my @stat_msg :shared;
        $where = "" eq $where ? $GLOBAL_CONDITION : " where ".$where;
        my $query = qq{drop external table if exists $ext_table;\n};
        $query = $query.qq{create external web table $ext_table(like "$scma"."$rel")\n};
        $query = $query."execute E'sh /tmp/fifo/gpmcrestorecat.sh $backup_path $time \$GP_SEG_PORT $back_tbnm $self_path' on ALL format 'text' encoding '$ENCODING';\n";
        if($NEED_TRUNCATE){
            $query = $query.qq{truncate table "$scma"."$rel";\n};
        }
        $query = $query.qq{insert into "$scma"."$rel" select * from $ext_table}.$where.";";
        my @info_msg :shared = ("INFO","Start restore table $full_name".$truncate_info);
        $MSG_QUEUE->enqueue(\@info_msg);
        my $start = time();
        my ($code,$value) = queryResult($query,"CV");
        my $duration = time() - $start;
        if($code eq 0){
            for my $row(split(/\n/,$value)){if($row =~ /^INSERT/){$value = (split(/ /,$row))[2];}}
            @stat_msg = ("SUCCESS"," ROWS: $value TIME: $duration S TIME: $time",$full_name);
        }else{
            @stat_msg = ("FAILED"," ".$value,$full_name);
        }
        $MSG_QUEUE->enqueue(\@stat_msg);
        $task = $TASK_QUEUE->dequeue();
    }
    $MSG_QUEUE->enqueue(undef);
}
sub executeMessage{
    $SIG{'KILL'} = sub{threads->exit;};
    my ($end_index,$error_index,$success_index) = (0,0,0);
    my $tables_size = @TABLE_ARRAY;
    my $msg = $MSG_QUEUE->dequeue();
    while(1){
        if(defined $msg){
            my ($type,$msg,$full_name) = @$msg;
            if("SUCCESS" eq $type){
                $success_index += 1;
                my $base64_name = encode($full_name);
                $base64_name =~ s/\n//g;
                my $done_record = $base64_name.";".$full_name;
                system(qq{sed '/^$base64_name;/d' -i $MASTER_DONE_FILE\necho '$done_record' >> $MASTER_DONE_FILE});
                logMessage($type,"($success_index/$error_index/$tables_size) $full_name".$msg);
            }elsif("FAILED" eq $type){
                $error_index += 1;
                logMessage($type,"($success_index/$error_index/$tables_size) $full_name".$msg);
            }else{
                logMessage($type,$msg);
            }
        }else{
            $end_index += 1;
            if($end_index eq $BATCH_SIZE){
                last;
            }
        }
        $msg = $MSG_QUEUE->dequeue();
    }
    return $error_index;
}
sub startRestore{
    my $tables_size = @TABLE_ARRAY;
    if(0 == $tables_size){
        logMessage("INFO","No table will be restore, exit");
        if($FORCE_REDO){
            exitMain(11);
        }else{
            exitMain(0);
        }
    }else{
        logMessage("INFO","Number of tables should be restore is: $tables_size");
    }
    $TASK_QUEUE = Thread::Queue->new();
    $MSG_QUEUE = Thread::Queue->new();
    for my $table_scalar(@TABLE_ARRAY){
        my @shared_array :shared = @$table_scalar;
        $TASK_QUEUE->enqueue(\@shared_array);
    }
    for my $index(0 .. $BATCH_SIZE - 1){
        $TASK_QUEUE->enqueue(undef);
        my $task_thread = threads->new(\&executeRestore,$index);
        push @TASK_THREAD,$task_thread;
    }
    $MSG_THREAD = threads->new(\&executeMessage);
    for my $thread(@TASK_THREAD){
        $thread->join();
    }
    @TASK_THREAD = ();
    my $value = $MSG_THREAD->join();
    $MSG_THREAD = "";
    return $value;
}
sub processIndexRecreate{
    if("" ne $INDEX_FILE_TIME){
        logMessage("INFO","Try to recreate index......");
        my $index_create_file = "$MASTER_LOG_PATH/$DATABASE_NAME.index.create.$INDEX_FILE_TIME.txt";
        my $index_create_log = "$MASTER_LOG_PATH/$DATABASE_NAME.index.create.$INDEX_FILE_TIME.log";
        my $command = "PGDATABASE=$DATABASE_NAME PGPORT=$PORT psql -qtAX -v ON_ERROR_STOP=1 -f $index_create_file > $index_create_log 2>&1";
        system($command);
        my $return_code = $? >> 8;
        if($return_code == 0){
            logMessage("INFO","Success recreate index from file: $index_create_file");
        }else{
            logMessage("WARN","Recreate index occur error: $index_create_log");
        }
        logMessage("INFO","Recreate index log file: $index_create_log");
    }
}
sub main{
    eval{threads->set_thread_exit_only(1);};
    if($@){
        errorMessage("Perl version is too old for multi threads.");
    }
    getOption();
    logMessage("INFO","Start restore process".("." x 66));
    logMessage("INFO","Run command: ".$_[0]);
    checkOption();
    processBackupTimeAndDataDirectory();
    system("unset PGOPTIONS");
    checkLanguage();
    checkExecuteFunction();
    createCleverCommand();
    restoreDdl();
    processTableArray();
    processLastBackupStat();
    processIndexDropAndLog();
    checkRestoreSchema();
    my $error_count = startRestore();
    if($error_count != 0){
        logMessage("INFO","Finish restore with failed......");
        exitMain(33);
    }else{
        logMessage("INFO","Finish data restore success......");
        processIndexRecreate();
        logMessage("INFO","Finish restore with all success......");
        exitMain(0);
    }
}
my $command_string = $0." ".join(" ",@ARGV);
STDOUT->autoflush(1);
STDERR->autoflush(1);
main($command_string);
